package com.ybw.order.demo.service.impl;

import com.ybw.order.demo.config.mq.MqTopicConfig;
import com.ybw.order.demo.constant.OrderConstant;
import com.ybw.order.demo.constant.mq.MessageDelayLevel;
import com.ybw.order.demo.constant.mq.MqConstant;
import com.ybw.order.demo.constant.mq.MqRetryConstant;
import com.ybw.order.demo.dto.HandleStockStatusDTO;
import com.ybw.order.demo.service.SenderService;
import com.ybw.order.demo.utils.GsonUtils;
import com.ybw.order.demo.utils.MyStringUtils;
import com.ybw.order.demo.utils.RetryUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author ybwei
 * @Title: Sender.java
 * @ProjectName com.spring.pro.rocketmq
 * @Description:
 * @date 2019年5月30日 下午7:25:08
 */
@Service
@Slf4j
public class SenderServiceImpl implements SenderService {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Resource
    private MqTopicConfig mqTopicConfig;

    @Override
    public void sendOrderExpireTime(String ordNo) {
        Message<String> message = MessageBuilder.withPayload(ordNo)
                .setHeader(MessageConst.PROPERTY_KEYS, MyStringUtils.generateUUID())
                .setHeader(MessageConst.PROPERTY_PRODUCER_GROUP, mqTopicConfig.getOrderExpireTimeProducerGroup())
                .build();
        rocketMQTemplate.asyncSend(mqTopicConfig.getOrderExpireTime(), message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("sendOrderExpireTime async sendResult:{}", GsonUtils.toJsonString(sendResult));
            }

            @Override
            public void onException(Throwable e) {
                log.error("消息发送失败,消息体:{}", GsonUtils.toJsonString(message), e);
            }

        }, MqConstant.TIMEOUT, MessageDelayLevel.NINE);
    }

    /**
     * 顺序发送
     *
     * @param topic
     * @param message
     * @param hashKey
     * @methodName: syncSendOrderly
     * @return: org.apache.rocketmq.client.producer.SendResult
     * @author: ybw
     * @date: 2022/8/17
     **/
    private SendResult syncSendOrderly(String topic, Message<?> message, String hashKey) {
        try {
            return (SendResult) RetryUtils.retry(MqRetryConstant.MAX_SEND_RETRY_COUNT, () -> rocketMQTemplate.syncSendOrderly(topic, message, hashKey));
        } catch (Exception e) {
            throw e;
        }
    }

    @Override
    public void sendOrderHandleStockStatus(HandleStockStatusDTO handleStockStatusDTO) {
        Message<HandleStockStatusDTO> message = MessageBuilder.withPayload(handleStockStatusDTO)
                .setHeader(MessageConst.PROPERTY_KEYS, MyStringUtils.generateUUID())
                .setHeader(MessageConst.PROPERTY_PRODUCER_GROUP, mqTopicConfig.getOrderHandleStockStatusProducerGroup())
                .build();
        String hashKey = new StringBuilder(OrderConstant.MqPre.HANDLE_STOCK_STATUS).append(handleStockStatusDTO.getGoodsId()).toString();
        SendResult sendResult = syncSendOrderly(mqTopicConfig.getOrderHandleStockStatus(), message, hashKey);
        log.info("sendResult:{}",GsonUtils.toJsonString(sendResult));
    }

}
